/* ******************************************************************************
* 2019 - present Contributed by Apulis Technology (Shenzhen) Co. LTD
*
* This program and the accompanying materials are made available under the
* terms of the MIT License, which is available at
* https://www.opensource.org/licenses/MIT
*
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: MIT
***************************************************************************** */
package jobscheduler

import (
	"encoding/json"
	"fmt"
	"net/url"
	"strings"
)

type Job struct {

	ModId    	int `json:"modId"`
	JobId		string `json:"jobId"`
	Owner   	string `json:"owner"`       // user name

	UserGroupId uint32 `json:"userGroupId"` // user group id
	ResType 	ResourceType `json:"resType"`

	// container data
	ImageName    string `json:"imageName"`
	Cmd          []string `json:"cmd"`
	Namespace    string `json:"namespace"`

	Labels 		map[string]string `json:"labels"`
	Envs 		map[string]string `json:"envs"`
	MountPoints []MountPoint `json:"mountPoints"`

	ArchType	string `json:"archType"`
	Quota       ResourceQuota `json:"quota"`

	// pre-start
	// *      - exec all scripts
	// empty  - don't exec scripts
	// not empty - exec the scripts (separated by space) in this field
	PreStartScripts string `json:"preStartScripts"`

	// depends on resType
	Ext 	    map[RESOURCE_FIELD_NAME]interface{} `json:"ext"`

	// container
	Containers  []Container `json:"containers"`
	
	// init-container
	InitContainer *Container `json:"initContainer"`
}

func NewJob() *Job {
	return &Job{
		Cmd:             make([]string, 0),
		Labels:          make(map[string]string),
		Envs:            make(map[string]string),
		MountPoints:     make([]MountPoint, 0),
		Ext:             make(map[RESOURCE_FIELD_NAME]interface{}),
		InitContainer:   NewContainer(),
	}
}

func (j *Job) Valid() (bool, string) {

	if j.GetModId() == 0 {
		return false, fmt.Sprintf("invalid module id: %d", j.GetModId())
	}

	resType := j.GetResourceType()
	if resType < RESOURCE_TYPE_POD || resType >= RESOURCE_TYPE_UNKONWN {
		return false, fmt.Sprintf("invalid module id: %d", j.GetModId())
	}

	//if len(j.GetImage()) == 0 {
	//	return false, fmt.Sprintf("empty image name")
	//}

	if len(j.GetNamespace()) == 0 {
		return false, fmt.Sprintf("empty namespace ")
	}

	if j.GetResourceType() == RESOURCE_TYPE_DEPLOYMENT {
		_, err := j.GetReplicas()
		if err != nil {
			return false, fmt.Sprintf("%v", err)
		}
	}

	return true, ""
}

func (j *Job) SetModId(modid int) *Job  {
	j.ModId = modid
	return j
}

func (j *Job) GetModId() int {
	return j.ModId
}

func (j *Job) SetJobId(jobId string) *Job  {
	j.JobId = jobId
	return j
}

func (j *Job) GetJobId() string {
	return j.JobId
}

func (j *Job) SetUserGroupId(groupId uint32) *Job  {
	j.UserGroupId = groupId
	return j
}

func (j *Job) GetUserGroupId() uint32 {
	return j.UserGroupId
}


func (j *Job) SetOwner(owner string) *Job  {
	j.Owner = owner
	return j
}

func (j *Job) GetOwner() string {
	return j.Owner
}

func (j *Job) SetUserName(user string) *Job  {
	j.Owner = user
	return j
}

func (j *Job) GetUserName() string {
	return j.Owner
}

func (j *Job) SetResType(resType ResourceType) *Job  {
	j.ResType = resType
	return j
}

func (j *Job) GetResourceType() ResourceType {
	return j.ResType
}

func (j *Job) SetImage(image string) *Job  {
	j.ImageName = image
	return j
}

func (j *Job) GetImage() string {
	return j.ImageName
}

func (j *Job) SetCmd(cmd []string) *Job  {
	j.Cmd = cmd
	return j
}

func (j *Job) GetCmd() []string {
	return j.Cmd
}

func (j *Job) SetNamespace(ns string) *Job  {
	j.Namespace = ns
	return j
}

func (j *Job) GetNamespace() string {
	return j.Namespace
}

func (j *Job) SetLabels(labels map[string]string) *Job  {
	j.Labels = labels
	return j
}

func (j *Job) GetLabels() map[string]string {
	return j.Labels
}

func (j *Job) SetEnvs(envs map[string]string) *Job  {
	j.Envs = envs
	return j
}

func (j *Job) GetEnvs() map[string]string {
	return j.Envs
}

func (j *Job) SetMounts(mounts []MountPoint) *Job  {
	j.MountPoints = mounts
	return j
}

func (j *Job) GetMounts() []MountPoint {
	return j.MountPoints
}

func (j *Job) SetArchType(at string) *Job  {
	j.ArchType = at
	return j
}

func (j *Job) GetArchType() string {
	return j.ArchType
}

func (j *Job) SetQuota(quota ResourceQuota) *Job  {
	j.Quota = quota
	return j
}

func (j *Job) GetQuota() ResourceQuota {
	return j.Quota
}

func (j *Job) SetPreStartScripts(scripts string) *Job  {
	j.PreStartScripts = scripts
	return j
}

func (j *Job) GetPreStartScripts() string  {
	return j.PreStartScripts
}

func (j *Job) ToString() string {
	data, _ := json.Marshal(*j)
	return string(data)
}

func (j *Job) FromString(data string) error {
	return json.Unmarshal([]byte(data), j)
}

func (j *Job) GetReplicas() (int32, error) {

	if j.Ext == nil {
		return 0, fmt.Errorf("replicas field not found!!")
	}

	data, ok := j.Ext[(FIELD_REPLICA)]
	if !ok {
		return 0, fmt.Errorf("replicas field not found!!")
	}

	count := data.(float64)
	return int32(count), nil
}

func (j *Job) SetReplicas(replicas int32) {

	if j.Ext == nil {
		j.Ext = make(map[RESOURCE_FIELD_NAME]interface{})
	}

	j.Ext[(FIELD_REPLICA)] = replicas
	return
}

func (j *Job) GetContainerPorts() ([]ContainerPort, error) {

	if j.Ext == nil {
		return nil, nil
	}

	data, ok := j.Ext[(FIELD_PORTS)]
	if !ok {
		return nil, nil
	}

	//count := data.([]ContainerPort)
	tmp := data.([]interface{})
	if tmp == nil {

		fmt.Println(j.JobId, "cann't convert ports field")
		return nil, nil

	} else {

		ports := make([]ContainerPort, 0)

		for _, port := range tmp {

			jsonString, _ := json.Marshal(port)
			containerPort := ContainerPort{}

			err := json.Unmarshal(jsonString, &containerPort)
			if err != nil {
				fmt.Println(fmt.Sprintf("%s unmashall err(%+v)", j.JobId, err))
				continue
			}

			ports = append(ports, containerPort)
		}

		fmt.Println("parsed ports: ", ports)
		return ports, nil
	}
}

// get ports info from containers field
func (j *Job) GetContainerPortsV2() ([]ContainerPort, error) {

	allPorts := make([]ContainerPort, 0)

	for _, c := range j.Containers {
		ports := c.GetPorts()
		if len(ports) > 0 {
			allPorts = append(allPorts, ports...)
		}
	}

	fmt.Println("parsed ports: ", allPorts)
	return allPorts, nil
}

func (j *Job) SetContainerPorts(ports []ContainerPort)  {

	if j.Ext == nil {
		j.Ext = make(map[RESOURCE_FIELD_NAME]interface{})
	}

	j.Ext[FIELD_PORTS] = ports
	return
}

func (j *Job) GetAffinity() *JobAffinity {

	if j.Ext == nil {
		return nil
	}

	data, ok := j.Ext[(FIELD_AFFINITY)]
	if !ok {
		return nil
	}

	dataAsString := data.(string)
	affnitiy := &JobAffinity{}

	err := json.Unmarshal([]byte(dataAsString), affnitiy)
	if err != nil {
		fmt.Println(j.JobId, "can't unmarshal from ext")
		return nil
	} else {
		fmt.Println("=============================")
		fmt.Println("affinity:", MustMarshalString(affnitiy))
		fmt.Println("=============================")
	}

	return affnitiy
}

func (j *Job) SetAffinity(jobAffinity *JobAffinity)  {

	if j.Ext == nil {
		j.Ext = make(map[RESOURCE_FIELD_NAME]interface{})
	}

	j.Ext[FIELD_AFFINITY] = MustMarshalString(jobAffinity)
	return
}

func (j *Job) GetTolerations() []Toleration {

	if j.Ext == nil {
		return nil
	}

	data, ok := j.Ext[(FIELD_TOLERATE)]
	if !ok {
		return nil
	}

	dataAsString := data.(string)
	tolerations := make([]Toleration, 0)

	err := json.Unmarshal([]byte(dataAsString), &tolerations)
	if err != nil {
		fmt.Println(j.JobId, "can't unmarshal from ext")
		return nil
	} else {
		fmt.Println("=============================")
		fmt.Println("tolerations:", MustMarshalString(tolerations))
		fmt.Println("=============================")
	}

	return tolerations
}

func (j *Job) SetTolerations(tolerations []Toleration)  {

	if j.Ext == nil {
		j.Ext = make(map[RESOURCE_FIELD_NAME]interface{})
	}

	j.Ext[FIELD_TOLERATE] = MustMarshalString(tolerations)
	return
}

func (j *Job) GetNodeSelector() map[string]string {

	if j.Ext == nil {
		return nil
	}

	data, ok := j.Ext[(FIELD_NODE_SELECTOR)]
	if !ok {
		return nil
	}

	dataAsString := data.(string)
	selectors := make(map[string]string)

	err := json.Unmarshal([]byte(dataAsString), &selectors)
	if err != nil {
		fmt.Println(j.JobId, "can't unmarshal from ext")
		return nil
	} else {
		fmt.Println("=============================")
		fmt.Println("selectors:", MustMarshalString(selectors))
		fmt.Println("=============================")
	}

	return selectors
}

func (j *Job) SetNodeSelector(selector map[string]string)  {

	if j.Ext == nil {
		j.Ext = make(map[RESOURCE_FIELD_NAME]interface{})
	}

	if selector == nil {
		return
	}

	j.Ext[FIELD_NODE_SELECTOR] = MustMarshalString(selector)
	return
}

func (j *Job) GetAnnotation() map[string]string {

	if j.Ext == nil {
		return nil
	}

	data, ok := j.Ext[(FIELD_ANNOTATION)]
	if !ok {
		return nil
	}

	dataAsString := data.(string)
	annotation := make(map[string]string)

	err := json.Unmarshal([]byte(dataAsString), &annotation)
	if err != nil {
		fmt.Println(j.JobId, "can't unmarshal from ext")
		return nil
	} else {
		fmt.Println("=============================")
		fmt.Println("annotation:", MustMarshalString(annotation))
		fmt.Println("=============================")
	}

	return annotation
}

func (j *Job) SetAnnotation(annotation map[string]string)  {

	if j.Ext == nil {
		j.Ext = make(map[RESOURCE_FIELD_NAME]interface{})
	}

	if annotation == nil {
		return
	}

	j.Ext[FIELD_ANNOTATION] = MustMarshalString(annotation)
	return
}

func (j *Job) DisableIstio()  {

	annotation := j.GetAnnotation()
	if annotation == nil {
		annotation = make(map[string]string)
	}

	annotation["sidecar.istio.io/inject"] = "false"
	j.SetAnnotation(annotation)

	return
}


func (j *Job) GetAffinityLabels() map[string]string {

	return map[string]string{
		"task-affinity" : j.GetJobId(),
	}
}


func (j *Job) GetYamlContent() (string, error) {

	if j.Ext == nil {
		return "", fmt.Errorf("content field not found!!")
	}

	data, ok := j.Ext[FIELD_YAML]
	if !ok {
		return "", fmt.Errorf("content field not found!!")
	}

	content := data.(string)
	return content, nil
}

func (j *Job) SetYamlContent(content string) {

	if j.Ext == nil {
		j.Ext = make(map[RESOURCE_FIELD_NAME]interface{})
	}

	j.Ext[(FIELD_YAML)] = content
	return
}

func (j *Job) NeedSetupEnv() bool  {
	if len(j.PreStartScripts) > 0 {
		return true
	} else {
		return false
	}
}

func (j *Job) IsKubeJob() bool  {
	if j.ResType == RESOURCE_TYPE_JOB {
		return true
	} else {
		return false
	}
}

func (j *Job) IsKubeDeployment() bool  {
	if j.ResType == RESOURCE_TYPE_DEPLOYMENT {
		return true
	} else {
		return false
	}
}

func (j *Job) GetInitContainer() *Container  {
	return j.InitContainer
}

func (j *Job) SetInitContainer(c *Container)   {
	j.InitContainer = c
}

func (j *Job) GetComputeDeviceName() string {

	quota := j.GetQuota()
	if len(quota.Request.Device.DeviceType) > 0 && len(quota.Request.Device.DeviceNum) > 0 {
		return quota.Request.Device.DeviceType
	}

	if len(quota.Limit.Device.DeviceType) > 0 && len(quota.Limit.Device.DeviceNum) > 0 {
		return quota.Limit.Device.DeviceType
	}

	return ""
}

func (j* Job) GetContainers() []Container {
	return j.Containers
}

func (j* Job) AddContainer(c Container) {
	j.Containers = append(j.Containers, c)
}

type MountPoint struct {

	// https://en.wikipedia.org/wiki/File_URI_scheme#Unix

	// example:
	//    hostpath - file:///hostpath
	//    pvc      - pvc://pvc-name/subpath
	//    configmap - cm://configmap-name/subpath
	//    emptyDir  - emptydir://dir-name

	//    to access /etc/fstab
	//    	file://localhost/	etc/fstab
	//    	file:///etc/fstab
	Path 			string 		`json:"path"`
	ContainerPath   string     	`json:"containerPath"`
	ReadOnly        bool       	`json:"readOnly"`
}

func (m *MountPoint) IsHostPath() bool  {

	if strings.HasPrefix(m.Path, HOSTPATH_PREFIX) ||
		strings.HasPrefix(m.Path, HOSTPATH_LOWER_PREFIX)  {
		return true
	}

	return false
}

func (m *MountPoint) IsPVC() bool  {

	if strings.HasPrefix(m.Path, PVC_PREFIX) {
		return true
	}

	return false
}

func (m *MountPoint) IsEmptyDir() bool  {

	if strings.HasPrefix(m.Path, EMPTY_DIR_PREFIX) {
		return true
	}

	return false
}

func (m *MountPoint) GetPVC() (name, subPath string)  {

	url, err := url.Parse(m.Path)
	if err != nil {
		fmt.Printf("parse pvc err: %v", err)
		return "", ""
	}

	return url.Host, strings.TrimPrefix(url.Path, "/")
}

func (m *MountPoint) IsConfigMap() bool  {

	if strings.HasPrefix(m.Path, CM_PREFIX) ||
		strings.HasPrefix(m.Path, CM_LOWER_PREFIX)  {
		return true
	}

	return false
}

func (m *MountPoint) GetConfigMap() (name, subPath string) {

	url, err := url.Parse(m.Path)
	if err != nil {
		fmt.Printf("parse configmap err: %v", err)
		return "", ""
	}

	return url.Host, strings.TrimPrefix(url.Path, "/")
}

func (m *MountPoint) GetHostPath() (path string)  {

	if strings.HasPrefix(m.Path, HOSTPATH_PREFIX) {
		return strings.TrimPrefix(m.Path, HOSTPATH_PREFIX)
	} else {
		return strings.TrimPrefix(m.Path, HOSTPATH_LOWER_PREFIX)
	}
}

func (m *MountPoint) GetEmptyDirName() (name string)  {

	url, err := url.Parse(m.Path)
	if err != nil {
		fmt.Printf("parse pvc err: %v", err)
		return ""
	}

	return url.Host
}

func (m *MountPoint) GetPath() string  {
	return m.Path
}

type JobBase struct {

	// common data
	ModId    	int    `json:"modId"`
	JobId		string `json:"jobId"`
	ResType 	ResourceType `json:"resType"`

	// owner data
	Owner   	string `json:"owner"`       	// user name
	UserGroupId uint32 `json:"userGroupId"` 	// user group id
	Namespace   string `json:"namespace"`       // k8s namespace
}

func (j *JobBase) GetNamespace() string {
	return j.Namespace
}

type TritonPredictor struct {
	StorageURI	string `json:"storageUri"`
	RuntimeVersion string `json:"runtimeVersion"`
	Container
}

type InferenceJob struct {
	// common data
	JobBase	JobBase `json:"jobBase"`

	// triton predictor
	Triton *TritonPredictor `json:"tritonPredictor"`

	// transformer
	Transformer Container `json:"transformer"`

	// annotation
	Annotation map[string]string `json:"annotation"`
}

func NewInferenceJob() *InferenceJob {
	return &InferenceJob{
		JobBase:     JobBase{},
		Triton:      nil,
		Transformer: Container{},
	}
}

func (i *InferenceJob) Validate() error {

	if i.JobBase.ModId == 0 {
		return fmt.Errorf("invalid module id: %d", i.JobBase.ModId)
	}

	resType := i.JobBase.ResType
	if resType < RESOURCE_TYPE_POD || resType >= RESOURCE_TYPE_UNKONWN {
		return fmt.Errorf("invalid module id: %d", i.JobBase.ResType)
	}

	if len(i.Transformer.ImageName) == 0 {
		return fmt.Errorf("empty image name")
	}

	return nil
}

func (i *InferenceJob) GetJobId() string {
	return i.JobBase.JobId
}

func (i *InferenceJob) GetModId() int {
	return i.JobBase.ModId
}

func (i *InferenceJob) SetResType(resType ResourceType) {
	i.JobBase.ResType = resType
}

func (i *InferenceJob) SetJobId(jobId string) {
	i.JobBase.JobId = jobId
}

func (i *InferenceJob) ToString() string {
	data, _ := json.Marshal(*i)
	return string(data)
}

func (i *InferenceJob) FromString(data string) error {
	return json.Unmarshal([]byte(data), i)
}

func (i *InferenceJob) GetNamespace() string {
	return i.JobBase.Namespace
}

func (i *InferenceJob) SetAnnotation(annotation map[string]string) {
	i.Annotation = annotation
}

func (i *InferenceJob) GetAnnotation() map[string]string {
	return i.Annotation
}

type JobContext struct {

	resType ResourceType

	generalJob   *Job
	InferenceJob *InferenceJob
	sparkJob     *SparkJob
	distributedJob *DistributedJob

	currentState  *JobState
	kubeResObject KubeResObject // either of batchv1.Job、Pod、deployment
}

func NewJobContext(resType ResourceType, jobData interface{}) *JobContext {

	cntx := &JobContext{}
	cntx.resType = resType

	if resType == RESOURCE_TYPE_INFERENCE_SERVICE {

		inferenceJob := jobData.(*InferenceJob)
		cntx.InferenceJob = inferenceJob
	} else if resType == RESOURCE_TYPE_SPARK_APP {

		sparkJob := jobData.(*SparkJob)
		cntx.sparkJob = sparkJob
	} else if resType == RESOURCE_TYPE_DISTRIBUTED_JOB {

		job := jobData.(*DistributedJob)
		cntx.distributedJob = job
	} else {

		generalJob := jobData.(*Job)
		cntx.generalJob = generalJob
	}

	return cntx
}

func (j *JobContext) SetJobState(state *JobState) {
	j.currentState = state
}

func (j *JobContext) GetJobState() *JobState {
	return j.currentState
}

func (j *JobContext) SetKubeObject(obj KubeResObject) {
	j.kubeResObject = obj
}

func (j *JobContext) GetKubeObject() KubeResObject {
	return j.kubeResObject
}

func (j *JobContext) GetJobData() interface{} {

	resType := j.GetResType()

	if resType == RESOURCE_TYPE_INFERENCE_SERVICE {

		return j.InferenceJob

	} else if resType == RESOURCE_TYPE_SPARK_APP {

		return j.sparkJob
	} else if resType == RESOURCE_TYPE_DISTRIBUTED_JOB {

		return j.distributedJob

	} else {

		return j.generalJob
	}
}

func (j *JobContext) GetNamespace() string {

	resType := j.GetResType()

	if resType == RESOURCE_TYPE_INFERENCE_SERVICE {

		return j.InferenceJob.GetNamespace()

	} else if resType == RESOURCE_TYPE_SPARK_APP {

		return j.sparkJob.GetNamespace()
	} else if resType == RESOURCE_TYPE_DISTRIBUTED_JOB {

		return j.distributedJob.GetNamespace()

	} else {

		return j.generalJob.GetNamespace()
	}
}

func (j *JobContext) GetResType() ResourceType {
	return j.resType
}

func (j *JobContext) GetJobId() string {

	resType := j.GetResType()

	if resType == RESOURCE_TYPE_INFERENCE_SERVICE {

		return j.InferenceJob.GetJobId()

	} else if resType == RESOURCE_TYPE_SPARK_APP {

		return j.sparkJob.GetJobId()
	} else if resType == RESOURCE_TYPE_DISTRIBUTED_JOB {

		return j.distributedJob.GetJobId()
	} else {

		return j.generalJob.GetJobId()
	}
}

func (j *JobContext) ToString() string {

	resType := j.GetResType()

	if resType == RESOURCE_TYPE_INFERENCE_SERVICE {

		return j.InferenceJob.ToString()

	} else if resType == RESOURCE_TYPE_SPARK_APP {

		return j.sparkJob.ToString()
	} else if resType == RESOURCE_TYPE_DISTRIBUTED_JOB {

		return j.distributedJob.ToString()
	} else {

		return j.generalJob.ToString()
	}
}

func (j *JobContext) GetModId() int {

	resType := j.GetResType()

	if resType == RESOURCE_TYPE_INFERENCE_SERVICE {

		return j.InferenceJob.GetModId()
	} else if resType == RESOURCE_TYPE_SPARK_APP {

		return j.sparkJob.GetModId()
	} else if resType == RESOURCE_TYPE_DISTRIBUTED_JOB {

		return j.distributedJob.GetModId()
	} else {

		return j.generalJob.GetModId()
	}
}



type KubeResObject interface {

	GetJobState() *JobState
	GetKubeMsg() string
	//GetKubeObjectStatus()

	// return job-id from labels
	// return object name if no job-id label was set
	GetJobId() string
}

type ContainerPort struct {
	Port     int    `json:"port"`
	TargetPort int `json:"targetPort"`
	HostPort int `json:"hostPort"`
	PortName string `json:"portName,omitempty"`
	ServiceName string `json:"serviceName"`
	ServiceType string `json:"serviceType"`
}

type JobEnv struct {
	// pre-start
	// *      - exec all scripts
	// empty  - don't exec scripts
	// not empty - exec the scripts (separated by space) within this field
	PreStartScripts string `json:"preStartScripts"`
}

func NewJobEnv(prestartScripts string) *JobEnv {
	return &JobEnv{PreStartScripts:prestartScripts}
}

func (j *JobEnv) NeedSetupEnvForJob() bool {

	if len(j.PreStartScripts) > 0 {
		return true
	} else {
		return false
	}
}

func (j *JobEnv) GetPreStartScripts() string {

	return j.PreStartScripts
}